ClarkTech Expansion — Design

Purpose
- Formalize in-memory architecture recommendations for RF SCYTHE and the Operator Session Manager.
- Provide a concise design reference for implementers.

Scope
- RF hypergraph (nodes, hyperedges)
- Operator Session Manager (sessions, rooms, SSE/WebSocket streaming)
- AIS playback, Nmap, nDPI, and cross-domain correlation

Goals
- Reduce latency on hot paths
- Improve scalability and multi-process deployment
- Preserve provenance, auditability, and replayability
- Provide a migration path from current SQLite/dict-based approach

Architecture Overview
- Domains: RF, Network (Nmap/nDPI), Maritime (AIS), Operator (sessions/rooms)
- Live state: memory-resident canonical graph (Hypergraph Engine)
- Hot store: dedicated in-memory KV (recommended: Redis or Dragonfly)
- Event store: append-only in-memory stream (Redis Streams or ring buffer)
- Persistence: periodic snapshot + write-behind to SQLite (WAL) or long-term store

Key Components
1. Hypergraph Engine (single source of truth)
   - Nodes: unified model for RF emitters, hosts, vessels, operators, rooms
   - Hyperedges: explicit relationship objects connecting 2+ nodes
   - Indices: node->edges, type index, service/subnet indices
   - Spatial/frequency plug-ins for proximity queries

2. In-Memory KV Store
   - Sessions as TTL keys
   - Room membership as sets
   - Entity cache as hashes
   - Pub/Sub or Streams for event fanout and replay

3. Event Store
   - Append-only ring buffer or Redis Streams
   - Retain recent events for replay to late joiners
   - Feed write-behind persistence pipeline for audit

4. Spatial & Frequency Indices
   - KD-tree (cKDTree or BallTree) for lat/lon queries
   - Frequency buckets for band queries
   - Rebuild incrementally or lazily (dirty flag)

5. Write-behind Persistence
   - Queue write operations and batch to SQLite using WAL
   - Periodic snapshot of Hypergraph Engine state
   - Upserts for room_entities, sessions, operators

Concurrency & Coordination
- Prefer process-global in-memory store (Redis) for true multi-process scaling
- Atomic operations via Redis primitives reduce locking complexity
- If single-process Python is retained: use RLock for domain-partitioned locks
- Backpressure: event store + consumer(s) should support pausing and replay

Durability & Failure Recovery
- Snapshot cadence: configurable (e.g., 30s–5m based on scale)
- Event stream retention: keep enough events to replay recent history (e.g., 10k events)
- On startup: load last snapshot, then replay stream tail to reach current state
- Conflict resolution: last-write-wins for non-critical fields; application-level merge for provenance-sensitive changes

Migration Plan (high level)
1. Introduce Redis for sessions and Pub/Sub; keep SQLite as source-of-record.
2. Implement a Redis-backed session store and TTL keys; switch heartbeat lookups to Redis.
3. Add Redis Streams for entity events; wire SSE/WebSocket producers to publish there.
4. Implement Hypergraph Engine in memory; populate from existing RF hypergraph store.
5. Replace in-process dicts gradually with reads from the Hypergraph Engine.
6. Add snapshotting & write-behind to persist engine state to SQLite.

Operational Considerations
- Monitoring: expose metrics for spatial index size, stream lag, snapshot latency
- Testing: unit tests for hypergraph ops and integration tests for replay and snapshot
- Security: session tokens should be HMAC-bound and rotated, secure password hashing (per-user salt)

Appendices
- Redis keyspace suggestions (in Implementation doc)
- Minimal API guidance for producers/consumers (publish, subscribe, replay)

Next steps
- Implement a minimal Hypergraph Engine prototype and unit tests (see Implementation doc).

Executive Summary

Goal: move hot mutable state out of plain Python dicts/queues into deliberate in‑memory DB patterns to improve concurrency, persistence, and multi‑process scaling for rf_scythe_api_server.py and operator_session_manager.py.
Primary tech choices: Redis (or Dragonfly) for KV, Pub/Sub, Streams; optional RedisGraph/Memgraph for hypergraph; write‑behind to SQLite for durable persistence.
High‑Impact Recommendations

KV Store for Hot Paths:

Why: atomic ops, TTL, cross-process state.
Map: RF nodes/edges → Redis Hashes; AIS positions → Sorted Sets by timestamp; sessions → TTL keys (session:<id>); room membership → Sets (room:<id>:members); global entity cache → Hash (entity:<id>).
Files to change: rf_scythe_api_server.py, operator_session_manager.py.
Append‑Only In‑Memory Event Store:

Why: replay for late SSE/WebSocket joiners, backpressure handling, consistent provenance.
How: Redis Streams or an in‑process ring buffer with capped size. Publish events to stream; clients read via consumer groups or XREAD. Use stream IDs as sequence_id in EntityEvent. Persist asynchronously to entity_audit_log.
Spatial & Frequency Indices (in‑memory):

Why: O(log n) proximity queries and fast frequency lookups.
How: Maintain KD‑tree (SciPy/BallTree) built incrementally from Redis snapshot OR store 3D cartesian coordinates in Redis + local KD‑tree caches invalidated incrementally. Maintain frequency→node sets in Redis for band queries.
Write‑Behind Cache for SQLite:

Why: eliminate DB synchronous I/O on hot writes (sessions, room joins, entity updates).
How: keep authoritative state in Redis; enqueue change events to a dedicated worker (in‑process thread or separate worker) that batches writes into SQLite WAL. Ensure ordering for audit logs.
Replace per‑client Python Queues with Pub/Sub/Streams:

Why: scale to many clients, support ordering and replay.
How: Publish EntityEvents to Redis Pub/Sub or Streams. SSE/WebSocket handlers become lightweight consumers; SSE clients can XREAD from last seen ID to replay missed events.
Materialized Views in Memory:

Why: avoid repeated O(n) recomputations (centrality, counts).
How: maintain counters and cached aggregates in Redis (HASH or simple counters) updated transactionally on writes (e.g., INCR/DECR). Use these for endpoints that currently scan all entities.
Session Tokens with TTL + Cryptographic Binding:

Why: reduced latency for validation, automatic expiry.
How: store session:<token> → operator_id with Redis TTL. Optionally store HMAC or sign token (HMAC-SHA256) to bind token to operator and detect tampering.
AIS Playback as Time‑Series in Memory:

Why: efficient scrubbing, multi‑operator playback sync.
How: per‑MMSI ring buffer in memory (Redis lists capped with LPUSH + LTRIM) or RedisTimeSeries module; keep a global playback pointer key for synchronized play.
Hypergraph as In‑Memory Graph DB:

Why: faster centrality, neighbor, and hyperedge membership queries.
How: adjacency lists in Redis (Sets) and per‑node metadata in Hash. For richer graph queries, evaluate RedisGraph or Memgraph. Precompute degrees and small centrality measures and store as fields.
Partition Memory by Domain:

Why: reduce lock contention and separate TTL/indices.
How: use key namespaces: rf:*, network:*, ais:*, operator:*. Use separate Redis logical databases or prefixes for clarity and different eviction policies.
Practical Migration & Rollout Plan (minimal friction)

Phase 0 — Non‑breaking prototypes: add optional Redis backend toggle in both modules; keep current dict/SQLite codepaths as fallback.
Phase 1 — Sessions & Pub/Sub: migrate session token lookup to Redis TTL keys and switch SSE/WebSocket fan‑out to Redis Pub/Sub/Streams.
Phase 2 — Event store & write‑behind: introduce Redis Streams for events, attach an async persistence worker to drain streams into SQLite audit logs.
Phase 3 — AIS time‑series & indices: move AIS playback into Redis lists/TimeSeries and implement incremental KD‑tree caches.
Phase 4 — Hypergraph: migrate adjacency lists into Redis Sets and evaluate RedisGraph for advanced queries.
Phase 5 — Optimization & autoscaling: separate domain services, run multiple worker processes connected to the same Redis instance.
Small concrete snippets / patterns

Redis session TTL pattern:
Key: session:<token> → value: JSON {operator_id, session_id}; TTL = session_timeout. Validate with GET.
Redis Streams for events:
XADD events stream: XADD events * event_type UPDATE entity_id ... provenance ...
Clients: XREADGROUP / XREAD with last ID for replay.
Write‑behind worker (pseudo):
pop batch from a Redis list / stream; write to SQLite with single transaction; ACK stream ids.
Operational Notes & Tradeoffs

Consistency: Redis is eventual unless you use Lua scripts for multi-key atomicity. Use Lua transactions for complex multi-key updates.
Durability: use AOF or RDB snapshots for Redis persistence and still persist audit logs to SQLite.
Backpressure: Redis Streams enable consumer groups and flow control—preferred over plain Pub/Sub for replay and durability.
Complex graph queries: RedisGraph adds query power but increases operational complexity; start with Sets + application-side joins.

Benjamin, this is the moment where your whole architecture snaps into place.  
A **unified API layer** is the membrane between the outside world and your living hypergraph — one clean, expressive surface that hides the internal machinery and exposes a single, elegant contract to clients.

Let’s sketch it in a way that feels *Clarktech‑native*, matches your engine’s semantics, and positions you perfectly for Redis‑backed event streaming, room‑scoped subgraphs, and DSL‑driven queries.

---

# **Unified API Layer (Conceptual Overview)**

The API layer becomes a thin, declarative wrapper around the HypergraphEngine:

- **Reads** → always served from the engine  
- **Writes** → always go through the GraphEventBus (which writes to Redis Streams + in‑proc bus)  
- **Queries** → compiled DSL → engine query functions  
- **Subgraphs** → engine snapshot + diff generator  
- **Metrics** → engine.get_metrics()  
- **Operator/Room state** → edges + labels in the engine  

Everything becomes graph‑native.

---

# **1. API Surface (Clean, Unified, Domain‑Agnostic)**

### **Nodes**
```
GET  /api/nodes
GET  /api/nodes/{id}
POST /api/nodes
PATCH /api/nodes/{id}
DELETE /api/nodes/{id}
```

### **Edges**
```
GET  /api/edges
GET  /api/edges/{id}
POST /api/edges
DELETE /api/edges/{id}
```

### **Queries**
```
POST /api/query
POST /api/query/subgraph
POST /api/query/diff
```

### **Metrics**
```
GET /api/metrics
```

### **Operator/Room**
```
POST /api/rooms/{room_id}/join
POST /api/rooms/{room_id}/leave
GET  /api/rooms/{room_id}/subgraph
```

Everything is just nodes + edges + queries.

---

# **2. API Layer Implementation Skeleton**

Below is a clean Pythonic sketch (FastAPI‑style, but framework‑agnostic).

```python
class UnifiedAPI:
    def __init__(self, engine, event_bus, diff_generator, dsl_compiler):
        self.engine = engine
        self.event_bus = event_bus
        self.diff_generator = diff_generator
        self.dsl_compiler = dsl_compiler

    #
    # Node endpoints
    #
    def create_node(self, node_data):
        # Wrap in GraphEvent
        ge = GraphEvent(
            event_type="NODE_CREATE",
            entity_id=node_data.get("id"),
            entity_kind=node_data.get("kind", "entity"),
            entity_data=node_data
        )
        self.event_bus.publish(ge)
        return {"status": "ok", "id": ge.entity_id}

    def get_node(self, node_id):
        return self.engine.get_node(node_id)

    def update_node(self, node_id, patch):
        ge = GraphEvent(
            event_type="NODE_UPDATE",
            entity_id=node_id,
            entity_kind=patch.get("kind", "entity"),
            entity_data=patch
        )
        self.event_bus.publish(ge)
        return {"status": "ok"}

    def delete_node(self, node_id):
        ge = GraphEvent(
            event_type="NODE_DELETE",
            entity_id=node_id,
            entity_kind="entity",
            entity_data={}
        )
        self.event_bus.publish(ge)
        return {"status": "ok"}

    #
    # Edge endpoints
    #
    def create_edge(self, edge_data):
        ge = GraphEvent(
            event_type="EDGE_CREATE",
            entity_id=edge_data.get("id"),
            entity_kind=edge_data.get("kind", "relationship"),
            entity_data=edge_data
        )
        self.event_bus.publish(ge)
        return {"status": "ok", "id": ge.entity_id}

    def delete_edge(self, edge_id):
        ge = GraphEvent(
            event_type="EDGE_DELETE",
            entity_id=edge_id,
            entity_kind="relationship",
            entity_data={}
        )
        self.event_bus.publish(ge)
        return {"status": "ok"}

    #
    # Query endpoints
    #
    def run_query(self, dsl_query):
        predicate = self.dsl_compiler.compile(dsl_query)
        node_ids = [nid for nid, n in self.engine.nodes.items() if predicate.matches_node(n)]
        edge_ids = [eid for eid, e in self.engine.edges.items() if predicate.matches_edge(e)]
        return {
            "nodes": [self.engine.nodes[n] for n in node_ids],
            "edges": [self.engine.edges[e] for e in edge_ids]
        }

    def subgraph(self, dsl_query):
        predicate = self.dsl_compiler.compile(dsl_query)
        return self.engine.extract_subgraph(predicate)

    def subgraph_diff(self, dsl_query, from_seq, to_seq):
        predicate = self.dsl_compiler.compile(dsl_query)
        return self.diff_generator.generate_diff(
            query_id=dsl_query.get("id"),
            query_predicate=predicate,
            from_seq=from_seq,
            to_seq=to_seq
        )

    #
    # Metrics
    #
    def metrics(self):
        return self.engine.get_metrics()
```

This is the **cleanest possible API**:  
- All writes → GraphEventBus  
- All reads → HypergraphEngine  
- All queries → DSL → predicate → engine  
- All diffs → diff generator  

---

# **3. Why this API layer is the right move**

### **A. It enforces the unified graph ontology**
No more domain‑specific endpoints.  
Everything is a node or edge.

### **B. It makes the engine the single source of truth**
No more duplicated stores.  
No more inconsistent views.

### **C. It aligns perfectly with Redis Streams**
Writes → GraphEventBus → XADD  
Reads → engine  
Diffs → XREADGROUP + diff generator

### **D. It makes the UI trivial**
The UI only needs:

- `/api/query/subgraph`  
- `/api/query/diff`  
- SSE/WebSocket stream  

Everything else is just graph events.

### **E. It positions you for multi‑node scaling**
Because the API layer is stateless:

- Multiple API servers  
- Multiple OperatorSessionManager instances  
- One Redis Stream  
- One HypergraphEngine per instance (fed by events)

---

# **4. What I can sketch next**

If you want to keep going, I can lay out:

- A **FastAPI implementation** of this unified layer  
- A **DSL compiler** that maps to your engine’s indices  
- A **subgraph extractor** optimized for room‑scoped queries  
- A **Redis‑backed replay endpoint**  
- A **mission‑aware API namespace**  
- A **zero‑downtime migration plan** from RFHypergraphStore → engine  

Just tell me which direction you want to push next.

Benjamin, let’s do this properly — a **clean, production‑ready FastAPI implementation** of your unified API layer that wraps the HypergraphEngine, emits GraphEvents, and positions you for Redis Streams consumption.

This is the version that feels *inevitable* in your architecture: thin, declarative, graph‑native, and ready for multi‑operator real‑time sync.

Below is a complete, elegant sketch you can drop into your server.

---

# **UnifiedAPI (FastAPI Implementation)**  
### *Graph‑native. Event‑driven. Clean.*

This version assumes:

- `engine: HypergraphEngine`
- `event_bus: GraphEventBus` (dual‑write: in‑proc + Redis Streams)
- `diff_generator: SubgraphDiffGenerator`
- `dsl_compiler: DSLCompiler`

Everything else is just wiring.

---

## **1. FastAPI Setup**

```python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict, Any

app = FastAPI(title="Unified Hypergraph API")
```

---

## **2. Pydantic Models (API Contracts)**

These models define the external API surface — clean, typed, domain‑agnostic.

```python
class NodeCreate(BaseModel):
    id: Optional[str] = None
    kind: str = "entity"
    position: Optional[List[float]] = None
    frequency: Optional[float] = None
    labels: Optional[Dict[str, Any]] = None
    metadata: Optional[Dict[str, Any]] = None

class NodePatch(BaseModel):
    kind: Optional[str] = None
    position: Optional[List[float]] = None
    frequency: Optional[float] = None
    labels: Optional[Dict[str, Any]] = None
    metadata: Optional[Dict[str, Any]] = None

class EdgeCreate(BaseModel):
    id: Optional[str] = None
    kind: str = "relationship"
    nodes: List[str]
    weight: Optional[float] = 0.0
    labels: Optional[Dict[str, Any]] = None
    metadata: Optional[Dict[str, Any]] = None

class DSLQuery(BaseModel):
    query: str
    query_id: Optional[str] = None

class DiffRequest(BaseModel):
    query: str
    query_id: str
    from_sequence: int
    to_sequence: int
```

---

## **3. Node Endpoints**

### **Create Node**

```python
@app.post("/api/nodes")
def create_node(node: NodeCreate):
    ge = GraphEvent(
        event_type="NODE_CREATE",
        entity_id=node.id,
        entity_kind=node.kind,
        entity_data=node.dict(exclude_none=True)
    )
    event_bus.publish(ge)
    return {"status": "ok", "id": ge.entity_id}
```

### **Get Node**

```python
@app.get("/api/nodes/{node_id}")
def get_node(node_id: str):
    n = engine.get_node(node_id)
    if not n:
        raise HTTPException(404, "Node not found")
    return n
```

### **Patch Node**

```python
@app.patch("/api/nodes/{node_id}")
def patch_node(node_id: str, patch: NodePatch):
    ge = GraphEvent(
        event_type="NODE_UPDATE",
        entity_id=node_id,
        entity_kind=patch.kind or "entity",
        entity_data=patch.dict(exclude_none=True)
    )
    event_bus.publish(ge)
    return {"status": "ok"}
```

### **Delete Node**

```python
@app.delete("/api/nodes/{node_id}")
def delete_node(node_id: str):
    ge = GraphEvent(
        event_type="NODE_DELETE",
        entity_id=node_id,
        entity_kind="entity",
        entity_data={}
    )
    event_bus.publish(ge)
    return {"status": "ok"}
```

---

## **4. Edge Endpoints**

### **Create Edge**

```python
@app.post("/api/edges")
def create_edge(edge: EdgeCreate):
    ge = GraphEvent(
        event_type="EDGE_CREATE",
        entity_id=edge.id,
        entity_kind=edge.kind,
        entity_data=edge.dict(exclude_none=True)
    )
    event_bus.publish(ge)
    return {"status": "ok", "id": ge.entity_id}
```

### **Delete Edge**

```python
@app.delete("/api/edges/{edge_id}")
def delete_edge(edge_id: str):
    ge = GraphEvent(
        event_type="EDGE_DELETE",
        entity_id=edge_id,
        entity_kind="relationship",
        entity_data={}
    )
    event_bus.publish(ge)
    return {"status": "ok"}
```

---

## **5. DSL Query Endpoints**

### **Run Query (raw results)**

```python
@app.post("/api/query")
def run_query(req: DSLQuery):
    predicate = dsl_compiler.compile(req.query)
    nodes = [n for n in engine.nodes.values() if predicate.matches_node(n)]
    edges = [e for e in engine.edges.values() if predicate.matches_edge(e)]
    return {"nodes": nodes, "edges": edges}
```

### **Subgraph Snapshot**

```python
@app.post("/api/query/subgraph")
def subgraph(req: DSLQuery):
    predicate = dsl_compiler.compile(req.query)
    return engine.extract_subgraph(predicate)
```

### **Subgraph Diff**

```python
@app.post("/api/query/diff")
def subgraph_diff(req: DiffRequest):
    predicate = dsl_compiler.compile(req.query)
    return diff_generator.generate_diff(
        query_id=req.query_id,
        query_predicate=predicate,
        from_seq=req.from_sequence,
        to_seq=req.to_sequence
    )
```

---

## **6. Metrics Endpoint**

```python
@app.get("/api/metrics")
def metrics():
    return engine.get_metrics()
```

---

# **7. Why this FastAPI layer is so clean**

Because it:

- Treats **everything** as nodes + edges  
- Pushes **all writes** through GraphEventBus → Redis Streams  
- Pulls **all reads** from HypergraphEngine  
- Makes **queries** DSL‑driven  
- Makes **subgraphs** first‑class  
- Makes **diffs** trivial  
- Makes **OperatorSessionManager** a pure event consumer  
- Makes **UI** a pure subgraph renderer  

This is the exact shape of a modern, event‑sourced, graph‑native API.

---

Benjamin, this is the layer that turns your hypergraph into a **platform**.  
If you want, I can sketch:

- A **DSL compiler**  
- A **subgraph extractor**  
- A **Redis‑backed replay endpoint**  
- A **mission‑room API namespace**  
- Or a **zero‑downtime migration plan** from RFHypergraphStore → engine  

Just tell me where you want to go next.
